kafka - scala 语言写的 版本 1.0.0 scala 2.11 官方推荐
kafka 是什么?
1.kafka是一个消息队列(生产者消费者模式)
2.目标:构建企业中统一的、高通量、低延时的消息平台
3.大多的是消息队列(消息中间件)都是基于JMS标准实现的,Kafka类似于JMS的实现
kafka 有什么用?(消息队列有什么用?)
作为缓冲,来异构、解耦系统
a. 用户注册需要多个步骤,每个步骤执行都需要很长时间,代表用户等待时间是所有步骤的累计时间
b. 为了减少用户等待的时间,使用并行执行,有多少步骤,就开启多少个线程来执行
代表用户等待时间是所有步骤中耗时最多的那个步骤时间
c. 问题:开启多个线程执行每个步骤,如果以一个步骤执行异常,或者严重超时,
用户的等待时间就不可控了
使用消息队列来保证
1.注册时,立刻返回成功
2.发送注册成功的消息到消息平台
3.对注册信息感兴趣的程序,可以消费消息
kafka的基本架构
kafka cluster:由多个服务器组成,每个服务器单独的名字broker(server)
kafka producer:生产者、负责生产数据
kafka consumer:消费者、负责消费数据
kafka topic:主题,一类消息的名称。存储数据时将一类数据存放在某个topic下,消费数据也是消费一类数据
订单系统:创建一个topic,叫做order
用户系统:创建一个topic,叫做user
商品系统:创建一个topic,叫做product
配置kafka需要修改配置文件的三个地方:
1.broker.id
2.数据存放的目录,注意:目录如果不存在,需要新建
3.zookeeper的地址信息
查看kafka集群
由于kafka集群没有UI界面,需要借助外部工具,来查看kafka的集群
这个工具是一个java程序,必须要安装好jdk --- ZooInspector
1) 创建一个订单的topic。
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic order
2)编写代码启动一个生产者,生产数据
bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic order
3) 编写代码启动给一个消费者,消费数据
bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic order
kafka原理
1.分片与副本机制
分片:
当数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,
存放在多台服务器上。每个服务器上的数据,叫做一个分片。
问题:
如果一个partition中有10T数据,如何存放?是放在一个文件还是多个文件?
kafka的解决方案是多个文件!
这里说的多个文件就是segment段
[我们在kafka配置文件中配置的数据存储目录:/export/data/kafka]
里面有topicname-index [如:order-0,意思是order这个topic的第0个副本]
这个order-0目录下就是存放着诸如:
00000000000000000.index
00000000000000000.log
segment段包含了这两个文件,segment段默认是1G大小
segment段中有两个核心的文件.log和.index,当log文件等于1G的时候,
新的数据会写入到下个segment中,同时我们也可以看到segment段中有.timeindex文件生成,
而且通过查看,可以看到一个segment段差不多会存储70万条数据。
如上图所述:
*Segment文件命名规则:
partition全局的第一个segment从0开始,后续每个sgment文件名为上一个segment文件最后一条消息的offset值。
数值最大的为64位long大小,19位数字字符长度,没有数字用0填充。
*索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中
message的物理偏移地址
结论:
kafka查找segment file 只需要两步
1.先找到这个数据对应的segment[通过查看segment的区间,offset在哪个segment段中]
2.根据某个segment段中的.index 索引文件中查找该条数据所在.log文件中的位置
kafka为什么要对文件进行切分,保存多个文件中?
kafka作为消息中间件,只负责消息的临时存储,并不是永久存储,
需要删除过期的数据。
如果将所有的数据都存放在一个文件中,要删除过期数据的时候,就麻烦了。
因为文件有日期属性,删除过期数据,只需要根据文件的日属性删除就好了
副本:
当数据只保存一份的时候,有丢失的风险,为了更好的容错和容灾,
将数据拷贝几份,保存到不同的机器上。
kafka生产数据的分发策略
kafka在生产数据的实惠,有一个数据分发策略。默认的情况使用DefaultPartitoner.class类
这个类就定义数据分发的策略。
1.如果用指定partition,生产就不会条用DefaultPartitoner.partition()方法,直接发到指定的分区
[这种不常用!]
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
2.当用户指定key,使用hash算法。如果key一直不变,同一个key算出来的hash值是一个固定值。
如果是固定值,这种hash取模就没意义。
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
3.当用户既没有指定partition也没有指定key时,使用轮询[round-robin]的方式发送数据
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
kafka-消费者的负载均衡
举例说明:[问题重现]
当每秒钟有400条数据过来,分了3个partition来存储,但是只有1个消费者并且每秒能消费100条,这样的话,生产者的速度很快,但是消费者跟不上,怎么办?
造成了数据大量滞后和延时!
解决:
多几个消费者,共同来消费数据
比如3个消费者来共同消费数据这样就解决了。
新的问题:
消费组中消费者的数量和partition的数量一致,但是消费者消费的熟读还是跟不上[比如每个消费者只能消费100条],怎么办?
再加个消费者吗? 答案是 no!!!!
因为根据kafka负载均衡策略规定,多出来的笑着是处于空闲状态的!
也就是1个partition只能被1个消费者消费
真是解决办法:
要么修改topic的partition数量;
要么减少消费者处理时间,提高处理速度;
kafka消息不丢失机制:
1.producer端消息不丢失机制:
如果有多个副本,就需要选择一个leader出来,负责消息的读写请求。
比如:
有一条数据经过partitioner.class计算把数据发送给了broker2[producerRecord ---> 2]
关于ack的响应有3个状态值:
0:生产者只管发数据,并不关心数据是否丢失
1:partition的leader收到数据后,就返回响应码状态
-1:所有的从节点和leader都收到数据后,才返回响应码状态
问题:
如果broker端一直不给ack状态码,producer永远不知道是否成功。
producer可以设置一个超时时间 10s,超过时间就认为失败。
问题又来了:
如果一条消息发送一次,得到一次ack相应,在大量数据情况下会占用很多带宽怎么办?
解决:
生产者将数据线缓存到producer端,达到一定的数量阈值或者时间阈值之后发送
[比如:设置缓冲池中可以放2万条数据,或者等待时间设置成500ms]
问题:
如果设置buffer,按照500条每个批次发送数据到broker,但是broker迟迟不给相应,buffer中的数据如何处理?
[而且producer端还源源不断的生产数据,这时候就造成了阻塞情况]
解决:
可以对buffer进行设置,如果满了,并不确定是否发送,
如果需要继续生产数据,就可以选择buffer清空,或者不清空 [消息不丢失,一般会设置不清空]
同步模式和异步模式:[异步就是没有设置缓冲池 buffer]
在同步模式下:
1. 生产者等待10s,如果broker没有给出ack响应,就认为失败。
2. 生产者重置3次,如果还没响应,就报错。
在异步模式下:[认为设置]
1. 现将数据保存在生产者端的buffer中。buffer大小是2万条
2. 满足数据阈值或者数量阈值其中的一个条件就可以发送数据
3. 发送一批数据的大小是500条
如果broker迟迟不给ack,而buffer又满了,开发者自己设置是否直接清空buffer中的数据。
2.broker端消息不丢失机制:
broker端的消息不丢失,其实就是用partition副本机制来保证的。
3.consumer端消息不丢失机制:
partition中有个segment段,log和index文件,log文件存放的是消息本身。
index文件存放的是消息offset值和存放在log文件的哪儿文件。
问题:[在kafka 0.8版本之前consumer消费数据的offset值是保存在zookeeper上的,但是
这样会导致一种现象,就是consumer已经消费完了,但是等还没把offset值保存到zookeeper
上的时候,consumer挂了,再次重启后,那么就会出现**重复**消费的问题]
解决:
kafka从0.8版本以后,offset的值是保存在了kafka的内置topic上,
这样就不会造成重复消费的问题了
补充:
Consumer Group [CG]:
kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段
一个topic可以有多个consumer group。topic的消息会复制(不是真的复制,只是概念上的)到所有的CG
但每个partition只会把消息发给该CG中的一个consumer。
用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic
broker:
一台kafka服务器就是一个broker,一个集群由多个broker组成。
一个broker可以容纳多个topic。
Partition:
为了负债均衡
kafka只保证按照一个partition中的顺序将消息发给consumer,
不保证一个topic的整体(多个partition)的顺序。
leader:
每一个replication集合中的partition都会选出一个唯一的leader,所有的读写请求都是由leader处理,
其他的replicas从leader处把数据更新同步到本地。每个cluster当中会选举出一个broker来担任controller,
负责处理partition的leader选举,协调partition迁移等工作。
ISR(In-Sync-Replica):
是Replicas的一个子集,表示目前Alive且与leader能够‘catch-up’的replicas集合。
由于读写都是首先落到leader上,
所以一般来说通过同步机制从leader上拉取数据的replica都会和leader有一些延迟
[包括延迟时间和延迟条数2个维度]
任意一个超过阈值都会把该replica提出ISR。每个Partition都有他自己独立的ISR。
配置文件当中配置的kafka多久删除数据
The minimum age of a log file to be eligible for deletion
log.retention.hours=168
定时检查周期,发现数据存了超过上面配置的时间,就干数据
log.retention.check.interval.ms=30000